[AWS IoT] Kinesis Firehoseに連携しLambdaまで繋げてみる – (1)概要とKinesis Firehose以降の構築
はじめに
AWS IoTでは受信したメッセージの送信先としてKinesis Firehoseを選択することができます。Kinesis Firehoseでは受信したメッセージをS3やRedshiftに書き込むことができます。今回はAWS IoTで受信したメッセージを、Kinesis Firehose → S3 → Lambdaの順に連携するサンプルを作成しました。図にすると以下のようになります。
AWS IoT〜S3までは先に書いたように各サービスによる連携を、S3からLambdaの連携についてもS3へのオブジェクト配置時にLambdaが起動するように設定しました。
このような構成にした理由としては、以下の様な要件を想定したためです。
- IoTクライアントから送信されるデータをKinesis Firehoseで受信する
- 受信したオリジナルデータは加工せずにS3に貯めておきたい
- Lambdaに連携し、データを加工しRedshiftやDyamoDBに登録したい
サンプルの作成手順(Kinesis Firehose以降)
ではサンプルの作成手順についてです。今回はKinesis Firehose〜Lambdaまでの作成手順について記述します。AWS IoTの作成については次回までお待ち下さい。
0.環境要件
AWS環境の構築にはAWS CLI、LambdaのソースはJava8を使用し、リージョンはオレゴンを選択しました。
1.Lambdaの作成
最初にメッセージを受信するLambdaについてです。S3にファイルを配置時に呼び出されて、そのファイルのパスと内容をログに出力するソースです。
LambdaFunctionHandler.java
package com.classmethod.sample; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.S3Event; import lombok.val; public class LambdaFunctionHandler implements RequestHandler<S3Event, Object> { InputData inputData = null; @Override public Object handleRequest(S3Event input, Context context) { val logger = context.getLogger(); val record = input.getRecords().get(0); logger.log("********** Log Start **********" + "\n"); logger.log(record.getEventName() + "\n"); logger.log(record.getS3().getBucket().getName() + "\n"); logger.log(record.getS3().getObject().getKey() + "\n"); if(inputData == null) inputData = new InputData(); val lines = inputData.read(record.getS3().getBucket().getName(), record.getS3().getObject().getKey()); logger.log("input value = " + "\n"); logger.log(lines); logger.log("\n"); logger.log("********** Log End **********" + "\n"); return lines; } }
InputData.java
package com.classmethod.sample; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.GetObjectRequest; import lombok.val; import lombok.SneakyThrows; public class InputData { public String read(String bucketName, String key){ val s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider()); val s3Object = s3Client.getObject(new GetObjectRequest(bucketName, key)); return readS3File(s3Object.getObjectContent()); } @SneakyThrows private String readS3File(InputStream input){ try(val reader = new BufferedReader(new InputStreamReader(input))){ val lines = new StringBuilder(); while (true) { String line = reader.readLine(); if (line == null) break; lines.append(line + "\n"); } return lines.toString(); } } }
build.gradle
apply plugin: 'java' def defaultEncoding = 'UTF-8' [compileJava, compileTestJava]*.options*.encoding = defaultEncoding group = 'com.classmethod.sample' version = '0.0.1-SNAPSHOT' description = "LambdaS3Read" sourceCompatibility = 1.8 targetCompatibility = 1.8 repositories { jcenter() } dependencies { compile 'com.amazonaws:aws-lambda-java-core:1.0.0' compile 'com.amazonaws:aws-lambda-java-events:1.0.0' compile 'org.projectlombok:lombok:1.16.6' testCompile 'junit:junit:4.11' } jar { from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
このJavaソースをビルドし、作成したjarをLambda Functionとして登録します。Function名は「LambdaS3Read」、ロールのロールポリシーは以下のようにしました(が、Redshiftへの権限は必要ないですね・・・)。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:*" ], "Resource": "arn:aws:logs:*:*:*" }, { "Action": [ "redshift:*" ], "Effect": "Allow", "Resource": "*" }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ReadObject" ], "Resource": [ "arn:aws:s3:::*" ] } ] }
2.S3のバケット作成
次にS3にバケットを作成し、上記のLambdaを呼び出すようにします。先ずは以下のコマンドでバケットを作成します。
$ aws s3 mb s3://t-honda-kinesis-firehose make_bucket: s3://t-honda-kinesis-firehose/
Lambdaに権限を追加します。
$ aws lambda add-permission --function-name LambdaS3Read --statement-id s3-put-event --action lambda:InvokeFunction --principal s3.amazonaws.com --source-arn arn:aws:s3:::t-honda-kinesis-firehose { "Statement": "{\"Condition\":{\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:s3:::t-honda-kinesis-firehose\"}},\"Action\":[\"lambda:InvokeFunction\"],\"Resource\":\"arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:LambdaS3Read\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"s3.amazonaws.com\"},\"Sid\":\"s3-put-event\"}" }
バケットにオブジェクトを配置時、Lambdaを呼び出すようにします。読み出すLambdaのArn等の定義はnotification_configuration.jsonに記述しています。
$ aws s3api put-bucket-notification-configuration --bucket t-honda-kinesis-firehose --notification-configuration file://notification_configuration.json
notification_configuration.json
{"LambdaFunctionConfigurations": [{"LambdaFunctionArn": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:LambdaS3Read", "Events": ["s3:ObjectCreated:Put"]}]}
3.Kinesisの実行ロール作成
Kinesisを実行するロールを作成します。権限の詳細はassume_role_policy.jsonに定義しています。
$ aws iam create-role --role-name t-honda-kinesis-firehose_role --assume-role-policy-document file://assume_role_policy.json { "Role": { "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ { "Action": "sts:AssumeRole", "Principal": { "Service": "firehose.amazonaws.com" }, "Effect": "Allow", "Sid": "" } ] }, "RoleId": "AROAIDIXI655PEBY5VUYA", "CreateDate": "2016-02-09T22:42:58.017Z", "RoleName": "t-honda-kinesis-firehose_role", "Path": "/", "Arn": "arn:aws:iam::XXXXXXXXXXXX:role/t-honda-kinesis-firehose_role" } }
assume_role_policy.json
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "firehose.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
作成したロールにロールポリシーを付与します。ポリシーの詳細はrole_policy.jsonに定義しています。
aws iam put-role-policy --role-name t-honda-kinesis-firehose_role --policy-name KinesisFirehosePolicy --policy-document file://role_policy.json
role_policy.json
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::t-honda-kinesis-firehose", "arn:aws:s3:::t-honda-kinesis-firehose/*" ] } ] }
4.Kinesis Firehoseの作成
Kinesis Firehoseを作成します。kinesis_input.jsonのRoleARNに先ほど作成したロールのARNと、BucketARNに送信先のバケットを指定していることに注意してください。
$ aws firehose create-delivery-stream --delivery-stream-name t-honda-kinesis-to-s3 --cli-input-json file://kinesis_input.json { "DeliveryStreamARN": "arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/t-honda-kinesis-to-s3" }
kinesis_input.json
{ "DeliveryStreamName": "", "S3DestinationConfiguration": { "RoleARN": "arn:aws:iam::XXXXXXXXXXXX:role/t-honda-kinesis-firehose_role", "BucketARN": "arn:aws:s3:::t-honda-kinesis-firehose", "Prefix": "", "BufferingHints": { "SizeInMBs": 1, "IntervalInSeconds": 60 }, "CompressionFormat": "UNCOMPRESSED", "EncryptionConfiguration": { "NoEncryptionConfig": "NoEncryption" } } }
まとめ
ここまででKinesis Firehoseで受信したメッセージをS3に配置し、Lambdaを起動するまでを構築することができました。後はクライアントからメッセージを受信するAWS IoTの設定を行うだけです。これについては、次回に記述します。